package com.bw.func;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.bw.bean.TableProcess;

import com.bw.utils.MySQLUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

/**
 * @ClassName OrderWideDimAsyncFunction
 * @Description TODO   准备业务数据 DWD 层   实现动态分流功能封装  HBase
 * @Author SXLWTT 单新龙
 * @Date 2022/4
 * @Version 1.0
 **/
/*其中牵涉到别的包装的分流处理的函数为：我们知道，对于不同的数据我们不同的处理方法：
有的数据需要通过mysql 处理到hbase中去，有的需要处理到不同的kafka中去，其中我们可以通过配置表进行配置*/
public class TableProcessHoKFunction extends ProcessFunction<JSONObject,JSONObject> {
    //因为要将维度数据通过侧输出流输出，所以我们在这里定义一个侧输出流标记
    private OutputTag<JSONObject> outputTag;

    //用于在内存中存放配置表信息的Map <表名：操作,tableProcess>
    private Map<String, TableProcess> tableProcessMap = new HashMap<>();

    //实例化函数对象的时候，将侧输出流标签也进行赋值   主流存kafka
    public TableProcessHoKFunction(OutputTag<JSONObject> outputTag) {
        this.outputTag = outputTag;
    }

    //时间处理格式类，将字符时间转化为long
    SimpleDateFormat sdf = null;


    @Override
    public void open(Configuration parameters) throws Exception {
        //初始化配置表信息
        sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        refreshMeta();
    }

    private void refreshMeta() {
        //========1.从MySQL数据库配置表中查询配置信息============where source_table='base_category1' and operate_type='insert'
        System.out.println("查询配置表信息");
        List<TableProcess> tableProcessList = MySQLUtil.queryList("select * from table_process ", TableProcess.class, true);
        //对查询出来的结果集进行遍历
        for (TableProcess tableProcess : tableProcessList) {
            //获取源表表名
            String sourceTable = tableProcess.getSourceTable();
            //获取操作类型
            String operateType = tableProcess.getOperateType();
            //输出类型      hbase|kafka
            String sinkType = tableProcess.getSinkType();
            //输出目的地表名或者主题名
            String sinkTable = tableProcess.getSinkTable();
            //输出字段
            String sinkColumns = tableProcess.getSinkColumns();
            //表的主键
            String sinkPk = tableProcess.getSinkPk();
            //建表扩展语句
            String sinkExtend = tableProcess.getSinkExtend();
            //拼接保存配置的key
            String key = sourceTable +":"+ operateType;
//bin/kafka-console-consumer.sh --topic ods_base_db_m --bootstrap-server hadoop102:9092 --from-beginning
            //========2.将从配置表中查询到配置信息，保存到内存的map集合中=============
            tableProcessMap.put(key, tableProcess);
        }

        //如果没有从数据库的配置表中读取到数据
        if (tableProcessMap == null || tableProcessMap.size() == 0) {
            throw new RuntimeException("没有从数据库的配置表中读取到数据");
        }

        Iterator<Map.Entry<String, TableProcess>> iterator = tableProcessMap.entrySet().iterator();
        while (iterator.hasNext()){
            Map.Entry<String, TableProcess> next = iterator.next();
            System.out.println(next);
        }

    }





    @Override
    public void processElement(JSONObject jsonObj, Context ctx, Collector<JSONObject> out) throws Exception {

        //获取表名
        String table = jsonObj.getString("table");
        //获取操作类型
        String type = jsonObj.getString("type");
        if (tableProcessMap != null && tableProcessMap.size() > 0) {
            //根据表名和操作类型拼接key
            String key = table + ":" + type;
            System.out.println(key+"---");
            //从内存的配置Map中获取当前key对象的配置信息
            TableProcess tableProcess = tableProcessMap.get(key);

            //如果获取到了该元素对应的配置信息
            if (tableProcess != null) {
                //获取sinkTable，指明当前这条数据应该发往何处  如果是维度数据，那么对应的是phoenix中的表名；如果是事实数据，对应的是kafka的主题名
                jsonObj.put("sink_table", tableProcess.getSinkTable());
                String sinkColumns = tableProcess.getSinkColumns();

            } else {
                System.out.println("NO this Key:" + key + " in MySQL");
            }

            //根据sinkType，将数据输出到不同的流
            if(tableProcess != null && tableProcess.getSinkType().equals(TableProcess.SINK_TYPE_HBASE)){
                //如果sinkType = hbase ，说明是维度数据，通过侧输出流输出
                System.out.println("hbase $$$$$$$$$$$$$$$$$$$$$$$");
                ctx.output(outputTag,jsonObj);
            }else if(tableProcess != null && tableProcess.getSinkType().equals(TableProcess.SINK_TYPE_KAFKA)){
                //如果sinkType = kafka ，说明是事实数据，通过主流输出

                System.out.println("kafka  $$$$$$$$$$$$$$$$$$$$$$$");
                // 特殊说明：这里只有优惠券表没有创建时间，否则都把创建时间给转为为LONG
                if(!table.equalsIgnoreCase("coupon_use"))
                {
                    String data = jsonObj.getString("data");
                    JSONObject jsonDataObject = JSON.parseObject(data);
                    //特殊处理将可能的出现的处理时间直接改为long 时间
                    String create_time = jsonDataObject.getString("create_time");
                    if(create_time != null)
                    {
                        long time = sdf.parse(create_time).getTime();
                        //如果其中牵涉到时间。，我们把时间的构造转化为LONG 型 ，为将来做准备
                        jsonObj.getJSONObject("data").put("create_ts",time) ;
                    }
                }
                out.collect(jsonObj);
            }
        }
    }






}

